---
title: Custom storage
description: Extending the ChatStorage class to create custom storage options in the Multi-Agent Orchestrator System
---

The Multi-Agent Orchestrator System provides flexibility in how conversation data is stored through its abstract `ChatStorage` class. This guide will walk you through the process of creating a custom storage solution by extending this class.

## Understanding the ChatStorage Abstract Class

The `ChatStorage` class defines the interface for all storage solutions in the system. It includes three main methods and two helper methods:

import { Tabs, TabItem} from '@astrojs/starlight/components';

<Tabs syncKey="runtime">
  <TabItem label="TypeScript" icon="seti:typescript" color="blue">
    ```typescript
    import { ConversationMessage } from "../types";

    export abstract class ChatStorage {
      protected isConsecutiveMessage(conversation: ConversationMessage[], newMessage: ConversationMessage): boolean {
        if (conversation.length === 0) return false;
        const lastMessage = conversation[conversation.length - 1];
        return lastMessage.role === newMessage.role;
      }

      protected trimConversation(conversation: ConversationMessage[], maxHistorySize?: number): ConversationMessage[] {
        if (maxHistorySize === undefined) return conversation;
        // Ensure maxHistorySize is even to maintain complete binoms
        const adjustedMaxHistorySize = maxHistorySize % 2 === 0 ? maxHistorySize : maxHistorySize - 1;
        return conversation.slice(-adjustedMaxHistorySize);
      }

      abstract saveChatMessage(
        userId: string,
        sessionId: string,
        agentId: string,
        newMessage: ConversationMessage,
        maxHistorySize?: number
      ): Promise<ConversationMessage[]>;

      abstract fetchChat(
        userId: string,
        sessionId: string,
        agentId: string,
        maxHistorySize?: number
      ): Promise<ConversationMessage[]>;

      abstract fetchAllChats(
        userId: string,
        sessionId: string
      ): Promise<ConversationMessage[]>;
    }
    ```
  </TabItem>
  <TabItem label="Python" icon="seti:python">
    ```python
    from abc import ABC, abstractmethod
    from typing import List, Optional
    from multi_agent_orchestrator.types import ConversationMessage

    class ChatStorage(ABC):
        def is_consecutive_message(self, conversation: List[ConversationMessage], new_message: ConversationMessage) -> bool:
            if not conversation:
                return False
            last_message = conversation[-1]
            return last_message.role == new_message.role

        def trim_conversation(self, conversation: List[ConversationMessage], max_history_size: Optional[int] = None) -> List[ConversationMessage]:
            if max_history_size is None:
                return conversation
            # Ensure max_history_size is even to maintain complete binoms
            adjusted_max_history_size = max_history_size if max_history_size % 2 == 0 else max_history_size - 1
            return conversation[-adjusted_max_history_size:]

        @abstractmethod
        async def save_chat_message(
            self,
            user_id: str,
            session_id: str,
            agent_id: str,
            new_message: ConversationMessage,
            max_history_size: Optional[int] = None
        ) -> List[ConversationMessage]:
            pass

        @abstractmethod
        async def fetch_chat(
            self,
            user_id: str,
            session_id: str,
            agent_id: str,
            max_history_size: Optional[int] = None
        ) -> List[ConversationMessage]:
            pass

        @abstractmethod
        async def fetch_all_chats(
            self,
            user_id: str,
            session_id: str
        ) -> List[ConversationMessage]:
            pass
    ```
  </TabItem>
</Tabs>

The `ChatStorage` class now includes two helper methods:

1. `isConsecutiveMessage` (TypeScript) / `is_consecutive_message` (Python): Checks if a new message is consecutive to the last message in the conversation.
2. `trimConversation` (TypeScript) / `trim_conversation` (Python): Trims the conversation history to the specified maximum size, ensuring an even number of messages.

The three main abstract methods are:

1. `saveChatMessage` (TypeScript) / `save_chat_message` (Python): Saves a new message to the storage.
2. `fetchChat` (TypeScript) / `fetch_chat` (Python): Retrieves messages for a specific conversation.
3. `fetchAllChats` (TypeScript) / `fetch_all_chats` (Python): Retrieves all messages for a user's session.

## Creating a Custom Storage Solution

To create a custom storage solution, follow these steps:

1. Create a new class that extends `ChatStorage`.
2. Implement all the abstract methods.
3. Utilize the helper methods `isConsecutiveMessage` and `trimConversation` in your implementation.
4. Add any additional methods or properties specific to your storage solution.

<hr/>
> **Important**
> When implementing `fetchAllChats`, concatenate the agent ID with the message text in the response when the role is ASSISTANT:

```text
ASSISTANT: [agent-a] Response from agent A
USER: Some user input  
ASSISTANT: [agent-b] Response from agent B
```
<hr/>


Here's an example of a simple custom storage solution using an in-memory dictionary:

<Tabs syncKey="runtime">
  <TabItem label="TypeScript" icon="seti:typescript" color="blue">
    ```typescript
    import { ChatStorage, ConversationMessage } from 'multi-agent-orchestrator';

    class SimpleInMemoryStorage extends ChatStorage {
    private storage: { [key: string]: ConversationMessage[] } = {};

    async saveChatMessage(
      userId: string,
      sessionId: string,
      agentId: string,
      newMessage: ConversationMessage,
      maxHistorySize?: number
    ): Promise<ConversationMessage[]> {
      const key = `${userId}:${sessionId}:${agentId}`;
      if (!this.storage[key]) {
        this.storage[key] = [];
      }
      
      if (!this.isConsecutiveMessage(this.storage[key], newMessage)) {
        this.storage[key].push(newMessage);
      }
      
      this.storage[key] = this.trimConversation(this.storage[key], maxHistorySize);
      return this.storage[key];
    }

    async fetchChat(
      userId: string,
      sessionId: string,
      agentId: string,
      maxHistorySize?: number
    ): Promise<ConversationMessage[]> {
      const key = `${userId}:${sessionId}:${agentId}`;
      const conversation = this.storage[key] || [];
      return this.trimConversation(conversation, maxHistorySize);
    }

    async fetchAllChats(
      userId: string,
      sessionId: string
    ): Promise<ConversationMessage[]> {
      const allMessages: ConversationMessage[] = [];
      for (const key in this.storage) {
        if (key.startsWith(`${userId}:${sessionId}`)) {
          const agentId = key.split(':')[2];
          for (const message of this.storage[key]) {
            const newContent = message.content ? [...message.content] : [];
            if (newContent.length > 0 && message.role === ParticipantRole.ASSISTANT) {
              newContent[0] = { text: `[${agentId}] ${newContent[0].text}` };
            }
            allMessages.push({
              ...message,
              content: newContent
            });
          }
        }
      }
      return allMessages;
    }
  }

    ```
  </TabItem>
  <TabItem label="Python" icon="seti:python">
    ```python
    from typing import List, Optional, Dict
    from multi_agent_orchestrator.storage import ChatStorage
    from multi_agent_orchestrator.types import ConversationMessage

    class SimpleInMemoryStorage(ChatStorage):
    def __init__(self):
        self.storage: Dict[str, List[ConversationMessage]] = {}

    async def save_chat_message(
        self,
        user_id: str,
        session_id: str,
        agent_id: str,
        new_message: ConversationMessage,
        max_history_size: Optional[int] = None
    ) -> List[ConversationMessage]:
        key = f"{user_id}:{session_id}:{agent_id}"
        if key not in self.storage:
            self.storage[key] = []
        
        if not self.is_consecutive_message(self.storage[key], new_message):
            self.storage[key].append(new_message)
        
        self.storage[key] = self.trim_conversation(self.storage[key], max_history_size)
        return self.storage[key]

    async def fetch_chat(
        self,
        user_id: str,
        session_id: str,
        agent_id: str,
        max_history_size: Optional[int] = None
    ) -> List[ConversationMessage]:
        key = f"{user_id}:{session_id}:{agent_id}"
        conversation = self.storage.get(key, [])
        return self.trim_conversation(conversation, max_history_size)

    async def fetch_all_chats(
        self,
        user_id: str,
        session_id: str
    ) -> List[ConversationMessage]:
        all_messages = []
        prefix = f"{user_id}:{session_id}"
        for key, messages in self.storage.items():
            if key.startswith(prefix):
                agent_id = key.split(':')[2]
                for message in messages:
                    new_content = message.content if message.content else []
                    if len(new_content) > 0 and message.role == ParticipantRole.ASSISTANT:
                        new_content[0] = {'text': f"[{agent_id}] {new_content[0]['text']}"}
                    all_messages.append(
                        ConversationMessage(
                            role=message.role,
                            content=new_content
                        )
                    )
        return sorted(all_messages, key=lambda m: getattr(m, 'timestamp', 0))
```

  </TabItem>
</Tabs>

## Using Your Custom Storage

To use your custom storage with the Multi-Agent Orchestrator:

<Tabs syncKey="runtime">
  <TabItem label="TypeScript" icon="seti:typescript" color="blue">
    ```typescript
    const customStorage = new SimpleInMemoryStorage();
    const orchestrator = new MultiAgentOrchestrator({
      storage: customStorage
    });
    ```
  </TabItem>
  <TabItem label="Python" icon="seti:python">
    ```python
    from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator
    from your_custom_storage_module import SimpleInMemoryStorage

    custom_storage = SimpleInMemoryStorage()
    orchestrator = MultiAgentOrchestrator(storage=custom_storage)
    ```
  </TabItem>
</Tabs>

By extending the `ChatStorage` class, you can create custom storage solutions tailored to your specific needs, whether it's integrating with a particular database system, implementing caching mechanisms, or adapting to unique architectural requirements.

Remember to consider factors such as scalability, persistence, and error handling when implementing your custom storage solution for production use. The helper methods `isConsecutiveMessage` and `trimConversation` can be particularly useful for managing conversation history effectively.